<!--
  Licensed to the Apache Software Foundation (ASF) under one or more
  contributor license agreements.  See the NOTICE file distributed with
  this work for additional information regarding copyright ownership.
  The ASF licenses this file to You under the Apache License, Version 2.0
  (the "License"); you may not use this file except in compliance with
  the License.  You may obtain a copy of the License at

      http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License.
-->
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"
    "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<meta name="generator" content=
"HTML Tidy for Windows (vers 25 March 2009), see www.w3.org" />
<title>Flume Developer Guide</title>

<style type="text/css">
/*<![CDATA[*/
ol{margin:0;padding:0}p{margin:0}.c6{vertical-align:top;width:468pt;border-style:solid;background-color:#d9d9d9;border-color:#000000;border-width:1pt;padding:5pt 5pt 5pt 5pt}.c11{list-style-type:disc;margin:0;padding:0}.c17{max-width:468pt;background-color:#ffffff;padding:72pt 72pt 72pt 72pt}.c18{color:#666666;font-size:12pt;font-weight:bold}.c7{color:#1155cc;text-decoration:underline}.c12{padding-left:0pt;margin-left:36pt}.c1{text-align:justify;direction:ltr}.c0{color:inherit;text-decoration:inherit}.c5{font-size:36pt;font-weight:bold}.c16{margin-left:13.5pt}.c4{line-height:1.0}.c10{border-collapse:collapse}.c3{direction:ltr}.c8{height:11pt}.c15{height:0pt}.c2{font-family:Courier New}.c14{font-style:italic}.c13{height:12pt}.c9{font-size:8pt}.title{padding-top:24pt;line-height:1.15;text-align:left;color:#000000;font-size:36pt;font-family:Arial;font-weight:bold;padding-bottom:6pt}.subtitle{padding-top:18pt;line-height:1.15;text-align:left;color:#666666;font-style:italic;font-size:24pt;font-family:Georgia;padding-bottom:4pt}body{color:#000000;font-size:11pt;font-family:Arial}h1{padding-top:24pt;line-height:1.15;text-align:left;color:#000000;font-size:18pt;font-family:Arial;font-weight:bold;padding-bottom:6pt}h2{padding-top:18pt;line-height:1.15;text-align:left;color:#000000;font-size:14pt;font-family:Arial;font-weight:bold;padding-bottom:4pt}h3{padding-top:14pt;line-height:1.15;text-align:left;color:#666666;font-size:12pt;font-family:Arial;font-weight:bold;padding-bottom:4pt}h4{padding-top:12pt;line-height:1.15;text-align:left;color:#666666;font-style:italic;font-size:11pt;font-family:Arial;padding-bottom:2pt}h5{padding-top:11pt;line-height:1.15;text-align:left;color:#666666;font-size:10pt;font-family:Arial;font-weight:bold;padding-bottom:2pt}h6{padding-top:10pt;line-height:1.15;text-align:left;color:#666666;font-style:italic;font-size:10pt;font-family:Arial;padding-bottom:2pt}
/*]]>*/
</style>
</head>
<body class="c17">
<p class="c1"><span class="c5">Flume 1.x Developer Guide</span></p>
<h1 class="c1"><span>Introduction</span></h1>
<h2 class="c1"><span>Overview</span></h2>
<p class="c1"><span>Apache Flume is a distributed, reliable, and
available system for efficiently collecting, aggregating and moving
large amounts of log data from many different sources to a
centralized data store.</span></p>
<p class="c1"><span>At the moment flume is an incubating Apache
project. There are release codelines currently distributed, version
0.9.x and 1.x.x. This guide is specific to 1.x (more specifically
1.1.0 release). The 0.9.x developer guide is available</span>
<span class="c7"><a class="c0" href=
"http://archive.cloudera.com/cdh/3/flume/DeveloperGuide/index.html">
here</a></span><span>.</span></p>
<h2 class="c1"><span>Architecture</span></h2>
<h3 class="c3"><span>Data flow model</span></h3>
<p class="c1"><span>A unit of data flow is called event which is a
byte payload that is accompanied by an optional set of string
attributes. Flume agent is a process (JVM) that hosts the
components that flows events from an external source to next
destination.</span></p>
<p class="c1"><img height="191" src="images/DevGuide_image00.png" width=
"458" /></p>
<p class="c1"><span>A source consumes events delivered to it by an
external source like web server in a specific format. For example,
an Avro source can be used to receive Avro events from clients or
other agents in the flow. When a source receives an event, it
stores it into one or more channels. &#160;The channel is a passive
store that keeps the event until its consumed by a sink. An example
of channel is the JDBC channel that uses a file-system backed
embedded database. The sink removes the event from channel and puts
it into an external repository like HDFS or forwards it to the
source in next hop of the flow. The source and sink within the
given agent run asynchronously with the events staged in the
channel.</span></p>
<p class="c3 c8"></p>
<h3 class="c1"><span>Reliability</span></h3>
<p class="c1"><span>The events are staged in the channel on each
agent. Then they are delivered to the next agent or terminal
repository (like HDFS) in the flow. The events are removed from the
channel only after they are stored in the channel of next agent or
in the terminal repository. This is a how the single-hop message
delivery semantics in Flume provide end-to-end reliability of the
flowFlume uses transactional approach to guarantee the reliable
delivery of the events. The sources and sinks encapsulate the
store/retrieval of the events in a transaction provided by the
channel. This ensures that the set of events are reliably passed
from point to point in the flow. In case of multi hop flow, the
sink on previous hop and source on next hop both have their
transactions running to ensure that the data is safely stored in
the channel of the next hop.</span></p>
<p class="c3 c8"></p>
<h1 class="c3"><a name="h.d2n7p5e1gfyr" id=
"h.d2n7p5e1gfyr"></a><span>Building Flume</span></h1>
<h3 class="c3"><a name="h.9u9zonoit4cm" id=
"h.9u9zonoit4cm"></a><span>Getting the source</span></h3>
<p class="c1"><span>Check out the code using SVN. The repository's
root lives</span><span><a class="c0" href=
"http://svn.apache.org/repos/asf/incubator/flume/">&#160;</a></span><span class="c7"><a class="c0"
href=
"http://svn.apache.org/repos/asf/incubator/flume/">here</a></span><span>.
The flume 1.x development happens under the branch 'trunk' so this
command line can be used.</span></p>
<p class="c3 c16"><span class="c2">svn co</span> <span class=
"c7 c2"><a class="c0" href=
"http://svn.apache.org/repos/asf/incubator/flume/trunk">http://svn.apache.org/repos/asf/incubator/flume/trunk</a></span><span class="c2">&#160;
flume-1.1</span><span><br /></span></p>
<p class="c1"><span>Alternately if you prefer using git</span></p>
<p class="c1 c16"><span class="c2">git clone</span> <span class=
"c2"><a class="c0" href=
"">git://git.apache.org/flume.git</a></span><span><br /></span></p>
<h3 class="c3"><a name="h.hw4kly53i5xn" id=
"h.hw4kly53i5xn"></a><span>Compile/test Flume</span></h3>
<p class="c3"><span>Flume build is</span> <span>mavenized. You can
compile standard maven commands -</span></p>
<ol class="c11" start="1">
<li class="c3 c12"><span>Compile only :</span>
<span>&#160;</span><span class="c2">mvn clean compile</span></li>
<li class="c12 c3"><span>Compile and test :</span> <span class=
"c2">mvn clean test</span></li>
<li class="c12 c3"><span>Run individual tests :</span> <span class=
"c2">mvn -Dtest=&lt;Test-1&gt;,&lt;Test-2&gt;,...&lt;Test-n&gt;
test</span> <span>&#160;</span></li>
<li class="c12 c3"><span>Package :</span> <span class="c2">mvn
clean package</span></li>
</ol>
<h3 class="c3 c13"><a name="h.jlnhs9y1uth4" id=
"h.jlnhs9y1uth4"></a></h3>
<p class="c3 c8"></p>
<h2 class="c3"><a name="h.x5myn7j60bg6" id=
"h.x5myn7j60bg6"></a><span>Developing custom components</span></h2>
<h3 class="c3"><a name="h.fluk2lo8cj0b" id=
"h.fluk2lo8cj0b"></a><span>Client</span></h3>
<p class="c1"><span>The client operates at the point of origin of
events and delivers them to a Flume agent. Clients typically
operate in the process space of the application &#160;they are
consuming data from. Currently flume supports Avro, log4j and
syslog as ways to transfer data from remote source. Additionally
there&#8217;s an Exec source that can consume the output of a local
process as input to Flume.</span></p>
<p class="c1"><span>It&#8217;s quite possible to have a use case
where these existing options are not sufficient. In this case you
can build a custom mechanism to send data to Flume. There are two
ways of achieving this. First is to create a custom client that
communicates to one of the flume&#8217;s existing sources like Avro
or syslog. Here the client should convert it&#8217;s data into
messages understood by these Flume sources. The other option is to
write a custom Flume source that directly talks to your existing
client application using some IPC or RPC protocols, and then
convert the data into flume events to send it upstream.</span></p>
<p class="c1 c8"></p>
<p class="c1"><span class="c18">ClientSDK</span></p>
<p class="c1"><span>Though flume contains a number of built in
mechanisms to ingest data, often one wants an ability to
communicate with flume directly from a custom application. The
ClientSDK is a library that&#8217;s can be facilitate applications
to connect to Flume and send data into Flume&#8217;s data
flow.</span></p>
<h4 class="c1"><a name="h.deqy7ul46had" id=
"h.deqy7ul46had"></a><span>RPC Client interface</span></h4>
<p class="c1"><span>The is an interfaces to wrap the user data data
and attributes into an event which Flume&#8217;s unit of flow. This
encapsulate the rpc mechanism supported by Flume sources. The
application can simply call append() or appendBatch() to send data
and not worry about the underlying message exchanges.</span></p>
<h4 class="c1"><a name="h.9u9rilu8b1pm" id=
"h.9u9rilu8b1pm"></a><span>Avro rpc client</span></h4>
<p class="c1"><span>Flume 1.10, Avro is only supports rpc format.
The</span> <span class=
"c14">NettyAvroRpcClient</span><span>&#160;implements the RpcClient
interface using for avro rpc. The client needs to create this
object with host/port of the flume agent and use it to send data
into flume. The following example shows how to used the clientSDK
APIs</span></p>
<p class="c1 c8"></p>
<table cellpadding="0" cellspacing="0" class="c10">
<tbody>
<tr>
<td class="c6">
<p class="c1 c4"><span class="c2">import
org.apache.flume.Event;</span></p>
<p class="c1 c4"><span class="c2">import
org.apache.flume.EventDeliveryException;</span></p>
<p class="c1 c4"><span class="c2">import
org.apache.flume.FlumeException;</span></p>
<p class="c1 c4"><span class="c2">import
org.apache.flume.api.RpcClient;</span></p>
<p class="c1 c4"><span class="c2">import
org.apache.flume.api.RpcClientFactory;</span></p>
<p class="c1 c4"><span class="c2">import
org.apache.flume.event.EventBuilder;</span></p>
<p class="c1 c4 c8"></p>
<p class="c1 c4"><span class="c2">&#8230;</span></p>
<p class="c1 c4"><span class="c2">&#8230;</span></p>
<p class="c1 c4 c8"></p>
<p class="c1 c4"><span class="c2">public void myInit ()
{</span></p>
<p class="c1 c4"><span class="c2">&#8230;</span></p>
<p class="c1 c4"><span class="c2">// setup the RPC connection to
Flume agent at hostname/port</span></p>
<p class="c1 c4"><span class="c2">RpcClient rpcClient =
RpcClientFactory.getDefaultInstance(hostname, port);</span></p>
<p class="c1 c4"><span class="c2">&#8230;</span></p>
<p class="c1 c4"><span class="c2">}</span></p>
<p class="c1 c4 c8"></p>
<p class="c1 c4"><span class="c2">public void
sendDataToFlume(String data) {</span></p>
<p class="c1 c4"><span class="c2">&#8230;</span></p>
<p class="c1 c4"><span class="c2">// Create flume event
object</span></p>
<p class="c1 c4"><span class="c2">Event event =
EventBuilder.withBody(data, Charset.forName("UTF8"));</span></p>
<p class="c1 c4"><span class="c2">try {</span></p>
<p class="c1 c4"><span class="c2">&#160;
rpcClient.append(event);</span></p>
<p class="c1 c4"><span class="c2">} catch (EventDeliveryException
e) {</span></p>
<p class="c1 c4"><span class="c2">&#160; // opps, failed to send
data .. DO SOMETHING &#8230;</span></p>
<p class="c1 c4"><span class="c2">}</span></p>
<p class="c1 c4"><span class="c2">&#8230;</span></p>
<p class="c1 c4"><span class="c2">}</span></p>
<p class="c1 c4 c8"></p>
<p class="c1 c4"><span class="c2">public void cleanUp ()
{</span></p>
<p class="c1 c4"><span class="c2">&#8230;</span></p>
<p class="c1 c4"><span class="c2">// close the rpc
connection</span></p>
<p class="c1 c4"><span class="c2">rpcClient.close();</span></p>
<p class="c1 c4"><span class="c2">&#8230;</span></p>
<p class="c1 c4"><span>}</span></p>
</td>
</tr>
</tbody>
</table>
<p class="c1 c8"></p>
<h4 class="c1"><a name="h.w4shqiv4k64n" id=
"h.w4shqiv4k64n"></a><span>Failover handler</span></h4>
<p class="c3"><span>This class wraps the Avro rpc client to provide
failover handling capability to clients. This takes a list of
host/ports of the Flume agent. If there&#8217;s an error in
communicating the current agent, then it automatically falls back
to the next agent in the list.</span></p>
<p class="c3 c8"></p>
<table cellpadding="0" cellspacing="0" class="c10">
<tbody>
<tr>
<td class="c6">
<p class="c4 c3"><span class=
"c2">&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;// Setup
properties for the failover</span></p>
<p class="c4 c3"><span class=
"c2">&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;Properties
props = new Properties();</span></p>
<p class="c4 c3"><span class=
"c2">&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;props.put("client.type",
"default_failover");</span></p>
<p class="c4 c3 c8"></p>
<p class="c4 c3"><span class=
"c2">&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;// list of
hosts</span></p>
<p class="c4 c3"><span class=
"c2">&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;props.put("hosts",
"host1 host2 host3");</span></p>
<p class="c4 c3 c8"></p>
<p class="c4 c3"><span class=
"c2">&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;//
address/port pair for each host</span></p>
<p class="c4 c3"><span class=
"c2">&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;props.put("hosts.host1",
host1 + &#8220;:" + port1);</span></p>
<p class="c4 c3"><span class=
"c2">&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;props.put("hosts.host1",
host2 + &#8220;:" + port2);</span></p>
<p class="c4 c3"><span class=
"c2">&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;props.put("hosts.host1",
host3 + &#8220;:" + port3);</span></p>
<p class="c4 c3 c8"></p>
<p class="c4 c3"><span class=
"c2">&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;// create the
client with failover properties</span></p>
<p class="c4 c3"><span class=
"c2">&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;client =
(FailoverRpcClient);</span></p>
<p class="c4 c3"><span class="c2">&#160; &#160;
&#160;RpcClientFactory.getInstance(props);</span></p>
</td>
</tr>
</tbody>
</table>
<p class="c3 c8"></p>
<p class="c3 c8"></p>
<h3 class="c3"><a name="h.3otzzn1v852e" id=
"h.3otzzn1v852e"></a><span>Transaction interface</span></h3>
<p class="c1"><span>The transaction interface is the basis of
reliability for Flume. All the major components ie. sources, sinks
and channels needs to interface with Flume transaction.</span></p>
<p class="c1"><img height="329" src="images/DevGuide_image01.png" width=
"500" /></p>
<p class="c1"><span>The transaction interface is implemented by a
channel implementation. The source and sink connected to channel
obtain a transaction object. The sources actually use a channel
selector interface that encapsulate the transaction (discussed in
later sections). The operations to stage or extract an event is
&#160;done inside an active transaction. For example,</span></p>
<p class="c1 c8"></p>
<table cellpadding="0" cellspacing="0" class="c10">
<tbody>
<tr>
<td class="c6">
<p class="c1"><span class="c2">&#160;Channel ch = ...</span></p>
<p class="c1"><span class="c2">&#160;Transaction tx =
ch.getTransaction();</span></p>
<p class="c1"><span class="c2">&#160;try {</span></p>
<p class="c1"><span class="c2">&#160; &#160;tx.begin();</span></p>
<p class="c1"><span class="c2">&#160; &#160;...</span></p>
<p class="c1"><span class="c2">&#160; &#160;// ch.put(event) or
ch.take()</span></p>
<p class="c1"><span class="c2">&#160; &#160;...</span></p>
<p class="c1"><span class="c2">&#160; &#160;tx.commit();</span></p>
<p class="c1"><span class="c2">&#160;} catch (ChannelException ex)
{</span></p>
<p class="c1"><span class="c2">&#160;
&#160;tx.rollback();</span></p>
<p class="c1"><span class="c2">&#160; ...</span></p>
<p class="c1"><span class="c2">} finally {</span></p>
<p class="c1"><span class="c2">&#160; tx.close();</span></p>
<p class="c1"><span class="c2">}</span></p>
</td>
</tr>
</tbody>
</table>
<p class="c1 c8"></p>
<p class="c1"><span>Here we get hold of a transaction from a
channel. After the begin method is executed, the event is put in
the channel and transaction is committed.</span></p>
<h3 class="c3"><a name="h.ihgon9tt60tk" id=
"h.ihgon9tt60tk"></a><span>Sink</span></h3>
<p class="c1"><span>The purpose of a sink to extract events from
the channel and forward it to the next Agent in the flow or store
in an external repository. A sink is linked to a channel instance
as per the flow configuration. There&#8217;s a sink runner thread
that&#8217;s get created for every configured sink which manages
the sink&#8217;s lifecycle. The sink needs to implement start() and
stop() methods that are part of lifecycleaware interface. The
start() method should initialize the sink and bring it to a state
where it can forward the events to its next destination. The
process() method from sink interface is should do the core
processing of extracting the event from channel and forwarding it.
The stop() method should do the necessary cleanup. The sink also
needs to implement a Configurable interface for processing its own
configuration settings.</span></p>
<p class="c1 c8"></p>
<table cellpadding="0" cellspacing="0" class="c10">
<tbody>
<tr class="c15">
<td class="c6">
<p class="c1"><span class="c2">// foo sink</span></p>
<p class="c1"><span class="c2">public class foo</span><span class=
"c2">Sink extends AbstractSink implements Configurable {</span></p>
<p class="c1"><span class="c2">&#160; @Override</span></p>
<p class="c1"><span class="c2">&#160; public void configure(Context
context) {</span></p>
<p class="c1"><span class=
"c2">&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;some_Param =
context.get("some_param", String.class);</span></p>
<p class="c1"><span class="c2">&#160; &#160; &#160;// process
some_param &#8230;</span></p>
<p class="c1"><span class="c2">&#160; }</span></p>
<p class="c1 c8"></p>
<p class="c1"><span class="c2">&#160; @Override</span></p>
<p class="c1"><span class="c2">&#160; public void start()
{</span></p>
<p class="c1"><span class="c2">&#160; &#160; // initialize the
connection to foo repository ..</span></p>
<p class="c1"><span class="c2">&#160; }</span></p>
<p class="c1 c8"></p>
<p class="c1"><span class="c2">&#160; @Override</span></p>
<p class="c1"><span class="c2">&#160; public void stop ()
{</span></p>
<p class="c1"><span class="c2">&#160; &#160; // cleanup and
disconnect from foo repository ..</span></p>
<p class="c1"><span class="c2">&#160; }</span></p>
<p class="c1 c8"></p>
<p class="c1"><span class="c2">&#160; @Override</span></p>
<p class="c1"><span class="c2">&#160; public Status process()
throws EventDeliveryException {</span></p>
<p class="c1"><span class="c2">&#160; &#160; // Start
transaction</span></p>
<p class="c1"><span class="c2">&#160; &#160; ch =
getChannel();</span></p>
<p class="c1"><span class="c2">&#160; &#160; tx =
ch.getTransaction();</span></p>
<p class="c1"><span class="c2">&#160; &#160; try {</span></p>
<p class="c1"><span class="c2">&#160; &#160; &#160;
tx.begin();</span></p>
<p class="c1"><span class="c2">&#160; &#160; &#160; Event e =
ch.take();</span></p>
<p class="c1"><span class="c2">&#160; &#160; &#160; // send the
event to foo</span></p>
<p class="c1"><span class="c2">&#160; &#160; &#160; //
foo.some_operation(e);</span></p>
<p class="c1"><span class="c2">&#160; &#160; &#160;
tx.commit();</span></p>
<p class="c1"><span class="c2">&#160; &#160; &#160; sgtatus =
Status.READY;</span></p>
<p class="c1"><span class="c2">&#160; &#160; (ChannelException e)
{</span></p>
<p class="c1"><span class="c2">&#160;
&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;tx.rollback();</span></p>
<p class="c1"><span class=
"c2">&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;status =
Status.BACKOFF;</span></p>
<p class="c1"><span class="c2">&#160; &#160; } finally {</span></p>
<p class="c1"><span class="c2">&#160; &#160; &#160;
tx.close();</span></p>
<p class="c1"><span class="c2">&#160; &#160; }</span></p>
<p class="c1"><span class="c2">&#160; &#160; return
status;</span></p>
<p class="c1"><span class="c2">&#160; }</span></p>
<p class="c1"><span class="c2">}</span></p>
</td>
</tr>
</tbody>
</table>
<p class="c1 c8"></p>
<h3 class="c3"><a name="h.1pnussqxjqbs" id=
"h.1pnussqxjqbs"></a><span>Source</span></h3>
<p class="c1"><span>The purpose of a source is to receive data from
an external client and store it in the channel. As mentioned above,
for source the transaction interface is encapsulated by channel
selector. Similar to sink-runner, there&#8217;s source runner
thread that&#8217;s get created for every configured source which
manages the source&#8217;s lifecycle. The source needs to implement
start() and stop() methods that are part of lifecycleaware
interface. There are two types of sources, pollable and
event-driver. The runner of pollable source runner invokes a
process() method from the pollable source. The process() method
should check for new data and store it in the channel. The event
driver source needs have its own callback mechanism that captures
the new data.</span></p>
<p class="c1 c8"></p>
<table cellpadding="0" cellspacing="0" class="c10">
<tbody>
<tr class="c15">
<td class="c6">
<p class="c3"><span class="c2">// bar source</span></p>
<p class="c3"><span class="c2">public class barSource extends
AbstractSource implements Configurable,
EventDrivenSource{</span></p>
<p class="c3"><span class="c2">&#160; @Override</span></p>
<p class="c3"><span class="c2">&#160; public void configure(Context
context) {</span></p>
<p class="c3"><span class=
"c2">&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;some_Param =
context.get("some_param", String.class);</span></p>
<p class="c3"><span class="c2">&#160; &#160; &#160;// process
some_param &#8230;</span></p>
<p class="c3"><span class="c2">&#160; }</span></p>
<p class="c3 c8"></p>
<p class="c3"><span class="c2">&#160; @Override</span></p>
<p class="c3"><span class="c2">&#160; public void start()
{</span></p>
<p class="c3"><span class="c2">&#160; &#160; // initialize the
connection to bar client ..</span></p>
<p class="c3"><span class="c2">&#160; }</span></p>
<p class="c3 c8"></p>
<p class="c3"><span class="c2">&#160; @Override</span></p>
<p class="c3"><span class="c2">&#160; public void stop ()
{</span></p>
<p class="c3"><span class="c2">&#160; &#160; // cleanup and
disconnect from bar client ..</span></p>
<p class="c3"><span class="c2">&#160; }</span></p>
<p class="c3 c8"></p>
<p class="c3"><span class="c2">&#160; @Override</span></p>
<p class="c3"><span class="c2">&#160; @Override</span></p>
<p class="c3"><span class="c2">&#160; public Status process()
throws EventDeliveryException {</span></p>
<p class="c3 c8"></p>
<p class="c3"><span class=
"c2">&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;try
{</span></p>
<p class="c3"><span class="c2">&#160; &#160; &#160;// receive new
data</span></p>
<p class="c3"><span class="c2">&#160; &#160; &#160;Event e =
get_some_data();</span></p>
<p class="c3 c8"></p>
<p class="c3"><span class="c2">&#160; &#160; &#160;// store the
event to underlying channels(s)</span></p>
<p class="c3"><span class="c2">&#160;
&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;getChannelProcessor().processEvent(e)</span></p>
<p class="c3"><span class=
"c2">&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;} catch
(ChannelException ex) {</span></p>
<p class="c3"><span class="c2">&#160;
&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;return
Status.BACKOFF;</span></p>
<p class="c3"><span class=
"c2">&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;}</span></p>
<p class="c3 c8"></p>
<p class="c3"><span class=
"c2">&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;return
Status.READY;</span></p>
<p class="c3"><span class="c2">&#160; }</span></p>
<p class="c3 c8"></p>
<p class="c3"><span class="c2">}</span></p>
</td>
</tr>
</tbody>
</table>
<p class="c1 c8"></p>
<h3 class="c3"><a name="h.bx0ouz5w3ofx" id=
"h.bx0ouz5w3ofx"></a><span>Channel</span></h3>
<p class="c3"><span>TBD</span></p>
<p class="c3 c8"></p>
<p class="c3 c8"></p>
</body>
</html>
